package com.three.tools.thread.pool;

import com.three.api.spi.Spi;
import com.three.api.spi.common.ExecutorFactory;
import com.three.config.common.IConfig;
import com.three.exception.PushException;
import com.three.tools.thread.NamedPoolThreadFactory;
import com.three.utils.LogUtils;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static com.three.tools.thread.ThreadNames.*;

/**
 * 此线程池可伸缩，线程空闲一定时间后回收，新请求重新创建线程
 * Created by mathua on 2017/5/24.
 */
@Spi(order = 1)
public final class DefaultExecutorFactory implements ExecutorFactory {

    private Executor get(ThreadPoolConfig config) {
        String name = config.getName();
        int corePoolSize = config.getCorePoolSize();
        int maxPoolSize = config.getMaxPoolSize();
        int keepAliveSeconds = config.getKeepAliveSeconds();
        BlockingQueue<Runnable> queue = config.getQueue();

        return new DefaultExecutor(corePoolSize
                , maxPoolSize
                , keepAliveSeconds
                , TimeUnit.SECONDS
                , queue
                , new NamedPoolThreadFactory(name)
                , new DumpThreadRejectedHandler(config));
    }

    @Override
    public Executor get(String name) {
        final ThreadPoolConfig config;
        switch (name) {
            case EVENT_BUS:// 用户处理内部事件分发
                config = ThreadPoolConfig
                        .build(T_EVENT_BUS)
                        .setCorePoolSize(IConfig.chess.thread.pool.event_bus.min)
                        .setMaxPoolSize(IConfig.chess.thread.pool.event_bus.max)
                        .setKeepAliveSeconds(TimeUnit.SECONDS.toSeconds(10))
                        .setQueueCapacity(IConfig.chess.thread.pool.event_bus.queue_size)//大量的online，offline，
                        .setRejectedPolicy(ThreadPoolConfig.REJECTED_POLICY_CALLER_RUNS);
                break;
            case MQ:// 用户上下线消息, 踢人等
                config = ThreadPoolConfig
                        .build(T_MQ)
                        .setCorePoolSize(IConfig.chess.thread.pool.mq.min)
                        .setMaxPoolSize(IConfig.chess.thread.pool.mq.max)
                        .setKeepAliveSeconds(TimeUnit.SECONDS.toSeconds(10))
                        .setQueueCapacity(IConfig.chess.thread.pool.mq.queue_size)
                        .setRejectedPolicy(ThreadPoolConfig.REJECTED_POLICY_CALLER_RUNS);
                ;
                break;
            case PUSH_CLIENT: {// 消息推送回调处理，该线程池在客户端运行
                ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(IConfig.chess.thread.pool.push_client
                        , new NamedPoolThreadFactory(T_PUSH_CLIENT_TIMER), (r, e) -> r.run() // run caller thread
                );
                executor.setRemoveOnCancelPolicy(true);
                return executor;
            }
            case PUSH_TASK://消息推送中心，推送任务线程池大小, 如果为0表示使用Gateway Server的work线程池，tcp下推荐0
                return new ScheduledThreadPoolExecutor(IConfig.chess.thread.pool.push_task, new NamedPoolThreadFactory(T_PUSH_CENTER_TIMER),
                        (r, e) -> {
                            throw new PushException("one push task was rejected. task=" + r);
                        }
                );
            case ACK_TIMER: {// 处理ACK消息超时
                ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(IConfig.chess.thread.pool.ack_timer,
                        new NamedPoolThreadFactory(T_ARK_REQ_TIMER),
                        (r, e) -> LogUtils.PUSH.error("one ack context was rejected, context=" + r)
                );
                executor.setRemoveOnCancelPolicy(true);
                return executor;
            }
            default:
                throw new IllegalArgumentException("no executor for " + name);
        }

        return get(config);
    }
}
